package com.bw.func;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.common.GmallConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;


/**
 * @ClassName MyHBaseSinkFunctionNew
 * @Description TODO  存储Hbase
 * @Author SXLWTT 单新龙
 * @Date 2022/4
 * @Version 1.0
 **/
public class MyHBaseSinkFunctionNew extends RichSinkFunction<JSONObject> {

    private transient Connection conn = null;
    private transient Table table = null;


    @Override
    public void open(Configuration parameters) throws Exception {

        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        //链接服务器
        conf.set("hbase.zookeeper.quorum", GmallConfig.HBASE_ZOOKEEPER_QUORUM);
        conf.set("hbase.zookeeper.property.clientPort", GmallConfig.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT);
        if (null == conn) {
            this.conn = ConnectionFactory.createConnection(conf);
        }
    }

    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        //得到需要的原始数据 转化为object
        //获取到数据中的表与类型。将来能在数据中的HBASE字段的获取
        //加一个随机值
        int max=10000,min=1;//随机生成一个行键
        long randomNum = System.currentTimeMillis();
        int ran3 = (int) (randomNum%(max-min)+min);
        String pkRamdom ="id" + ran3 ;

        String table1 = value.getString("table");
        String type = value.getString("type");
        String data1 = value.getString("data");//JSONObject数据
        String sink_table = value.getString("sink_table");//往哪落地


        //获取其中的data 真实的数据是其中的表数据     JSONObject数据
        JSONObject data = JSON.parseObject(data1);
        //在data 中获取到对应的主键值，主键应该有，如果没有随机值产生一条
        String pkId = data.getString("id") ==null? pkRamdom:data.getString("id");//三元运算符 ==null 赋值   pkRamdom随机的主键
        //根据得到的json 数据进行解析
        //不同的table 转换为不同的对象 然后再进行添加
        JSONObject jsonObject1 = JSON.parseObject(data1);
        //表名    GMALL0820_REALTIME
        String habseSchema = GmallConfig.HBASE_SCHEMA;//GmallConfig  文件配置类  GMALL0820_REALTIME
        String tablename = habseSchema + "." +sink_table;

        TableName tableName = TableName.valueOf(tablename);//TableName转成tablename类型的表名
        // 获取表对象
        table = conn.getTable(tableName);
        //生成要插入的记录的主键
        Put put = new Put(Bytes.toBytes(pkId));
        // 列簇，列名，列值
        //一级目录表
      /*  if(table1.equals("base_category1")) {
            BaseCategory1 baseCategory1 = JSON.toJavaObject(jsonObject1, BaseCategory1.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(String.valueOf(baseCategory1.getName())));

        }
        //二级目录表
        if(table1.equals("base_category2")){
            BaseCategory2 baseCategory1 = JSON.toJavaObject(jsonObject1, BaseCategory2.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(String.valueOf(baseCategory1.getName())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("category1_id"), Bytes.toBytes(String.valueOf(baseCategory1.getCategory1_id())));
        }
        //三级目录表
        if(table1.equals("base_category3")){
            //     String id;String name;String ;
            BaseCategory3 baseCategory1 = JSON.toJavaObject(jsonObject1, BaseCategory3.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(String.valueOf(baseCategory1.getName())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("category2_id"), Bytes.toBytes(String.valueOf(baseCategory1.getCategory2_id())));
        }
        //状态表维护
        if(table1.equals("base_dic")){
            //     String id;String name;String ;
            BaseDic baseCategory1 = JSON.toJavaObject(jsonObject1, BaseDic.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("dic_name"), Bytes.toBytes(String.valueOf(baseCategory1.getDic_name())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("parent_code"), Bytes.toBytes(String.valueOf(baseCategory1.getParent_code())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("create_time"), Bytes.toBytes(String.valueOf(baseCategory1.getCreate_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("operate_time"), Bytes.toBytes(String.valueOf(baseCategory1.getOperate_time())));
        }
        //维度省份表
        if(table1.equals("base_province")){
            //     String id;String name;String ;
            BaseProvince baseCategory1 = JSON.toJavaObject(jsonObject1, BaseProvince.class);
            // String ;String ;String ;String ;String ;
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(String.valueOf(baseCategory1.getName())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("region_id"), Bytes.toBytes(String.valueOf(baseCategory1.getRegion_id())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("area_code"), Bytes.toBytes(String.valueOf(baseCategory1.getArea_code())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("iso_code"), Bytes.toBytes(String.valueOf(baseCategory1.getIso_code())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("iso_3166_2"), Bytes.toBytes(String.valueOf(baseCategory1.getIso_3166_2())));
        }
        // base_region 区域表
        if(table1.equals("base_region")){
            //     String id;String name;String ;
            BaseRegion baseCategory1 = JSON.toJavaObject(jsonObject1, BaseRegion.class);
            //String id;String region_name ;
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("region_name"), Bytes.toBytes(String.valueOf(baseCategory1.getRegion_name())));
            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }
        //品牌表维度

        if(table1.equals("base_trademark")){
            //     String id;String name;String ;
            BaseTrademark baseCategory1 = JSON.toJavaObject(jsonObject1, BaseTrademark.class);
            // String ;String ;String ;String ;String ;
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("tm_name"), Bytes.toBytes(String.valueOf(baseCategory1.getTm_name())));
            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }
        if(table1.equals("financial_sku_cost")){
            FinancialSkuCost baseCategory1 = JSON.toJavaObject(jsonObject1, FinancialSkuCost.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("sku_id"), Bytes.toBytes(String.valueOf(baseCategory1.getSku_id())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("sku_name"), Bytes.toBytes(String.valueOf(baseCategory1.getSku_name())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("is_lastest"), Bytes.toBytes(String.valueOf(baseCategory1.getIs_lastest())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("sku_cost"), Bytes.toBytes(String.valueOf(baseCategory1.getSku_cost())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("create_time"), Bytes.toBytes(String.valueOf(baseCategory1.getCreate_time())));

            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }

        //优惠卷信息维度表
        if(table1.equals("coupon_info")){
            //     String id;String name;String ;
            CouponInfo baseCategory1 = JSON.toJavaObject(jsonObject1, CouponInfo.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("coupon_name"), Bytes.toBytes(String.valueOf(baseCategory1.getCoupon_name())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("coupon_type"), Bytes.toBytes(String.valueOf(baseCategory1.getCoupon_type())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("condition_amount"), Bytes.toBytes(String.valueOf(baseCategory1.getCondition_amount())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("condition_num"), Bytes.toBytes(String.valueOf(baseCategory1.getCondition_num())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("activity_id"), Bytes.toBytes(String.valueOf(baseCategory1.getActivity_id())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("benefit_amount"), Bytes.toBytes(String.valueOf(baseCategory1.getBenefit_amount())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("benefit_discount"), Bytes.toBytes(String.valueOf(baseCategory1.getBenefit_discount())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("create_time"), Bytes.toBytes(String.valueOf(baseCategory1.getCreate_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("range_type"), Bytes.toBytes(String.valueOf(baseCategory1.getRange_type())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("limit_num"), Bytes.toBytes(String.valueOf(baseCategory1.getLimit_num())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("taken_count"), Bytes.toBytes(String.valueOf(baseCategory1.getTaken_count())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("start_time"), Bytes.toBytes(String.valueOf(baseCategory1.getStart_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("end_time"), Bytes.toBytes(String.valueOf(baseCategory1.getEnd_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("operate_time"), Bytes.toBytes(String.valueOf(baseCategory1.getOperate_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("expire_time"), Bytes.toBytes(String.valueOf(baseCategory1.getExpire_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("range_desc"), Bytes.toBytes(String.valueOf(baseCategory1.getRange_desc())));
            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }
        //获取优惠卷区间维度表
        if(table1.equals("coupon_range")){
            //     String ;String ;String ;
            CouponRange baseCategory1 = JSON.toJavaObject(jsonObject1, CouponRange.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("coupon_id"), Bytes.toBytes(String.valueOf(baseCategory1.getCoupon_id())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("range_type"), Bytes.toBytes(String.valueOf(baseCategory1.getRange_type())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("range_id"), Bytes.toBytes(String.valueOf(baseCategory1.getRange_id())));
            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }
        // stock keep unit 库存量单位
        if(table1.equals("sku_info")){
            SkuInfo baseCategory1 = JSON.toJavaObject(jsonObject1, SkuInfo.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("spu_id"), Bytes.toBytes(String.valueOf(baseCategory1.getSpu_id ())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("price"), Bytes.toBytes(String.valueOf(baseCategory1.getPrice())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("sku_name"), Bytes.toBytes(String.valueOf(baseCategory1.getSku_name())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("sku_desc"), Bytes.toBytes(String.valueOf(baseCategory1.getSku_desc())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("weight"), Bytes.toBytes(String.valueOf(baseCategory1.getWeight())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("category3_id"), Bytes.toBytes(String.valueOf(baseCategory1.getCategory3_id())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("sku_default_img"), Bytes.toBytes(String.valueOf(baseCategory1.getSku_default_img())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("is_sale"), Bytes.toBytes(String.valueOf(baseCategory1.getIs_sale())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("create_time"), Bytes.toBytes(String.valueOf(baseCategory1.getCreate_time())));
            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }

        if(table1.equals("spu_info")){
            SpuInfo baseCategory1 = JSON.toJavaObject(jsonObject1, SpuInfo.class);
            // String ;String ;String ;String  ;
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("spu_name"), Bytes.toBytes(String.valueOf(baseCategory1.getSpu_name ())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("description"), Bytes.toBytes(String.valueOf(baseCategory1.getDescription())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("category3_id"), Bytes.toBytes(String.valueOf(baseCategory1.getCategory3_id())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("tm_id"), Bytes.toBytes(String.valueOf(baseCategory1.getTm_id())));
            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }
        // 用户信息表
        if(table1.equals("user_info")){//user_info没有初始化   导致订单宽表业务  存入ClickHoust数据少  包空指针
            UserInfo baseCategory1 = JSON.toJavaObject(jsonObject1, UserInfo.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("login_name"), Bytes.toBytes(String.valueOf(baseCategory1.getLogin_name ())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(String.valueOf(baseCategory1.getName())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("user_level"), Bytes.toBytes(String.valueOf(baseCategory1.getUser_level())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("birthday"), Bytes.toBytes(String.valueOf(baseCategory1.getBirthday())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("gender"), Bytes.toBytes(String.valueOf(baseCategory1.getGender())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("create_time"), Bytes.toBytes(String.valueOf(baseCategory1.getCreate_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("operate_time"), Bytes.toBytes(String.valueOf(baseCategory1.getOperate_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("email"), Bytes.toBytes(String.valueOf(baseCategory1.getEmail())));

            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }
        if(table1.equals("activity_info")){
            ActivityInfo baseCategory1 = JSON.toJavaObject(jsonObject1,    ActivityInfo.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("activity_name"), Bytes.toBytes(String.valueOf(baseCategory1.getActivity_name ())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("activity_type"), Bytes.toBytes(String.valueOf(baseCategory1.getActivity_type())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("activity_desc"), Bytes.toBytes(String.valueOf(baseCategory1.getActivity_desc())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("start_time"), Bytes.toBytes(String.valueOf(baseCategory1.getStart_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("end_time"), Bytes.toBytes(String.valueOf(baseCategory1.getEnd_time())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("create_time"), Bytes.toBytes(String.valueOf(baseCategory1.getCreate_time())));
            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }
        //活动规则
        if(table1.equals("activity_rule")){
            ActivityRule baseCategory1 = JSON.toJavaObject(jsonObject1,    ActivityRule.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("activity_id"), Bytes.toBytes(String.valueOf(baseCategory1.getActivity_id ())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("activity_type"), Bytes.toBytes(String.valueOf(baseCategory1.getActivity_type())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("condition_amount"), Bytes.toBytes(String.valueOf(baseCategory1.getCondition_amount())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("condition_num"), Bytes.toBytes(String.valueOf(baseCategory1.getCondition_num())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("benefit_amount"), Bytes.toBytes(String.valueOf(baseCategory1.getBenefit_amount())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("benefit_discount"), Bytes.toBytes(String.valueOf(baseCategory1.getBenefit_discount())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("benefit_level"), Bytes.toBytes(String.valueOf(baseCategory1.getBenefit_level())));
            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }
        //活动库存表
        if(table1.equals("activity_sku")){
            ActivitySku baseCategory1 = JSON.toJavaObject(jsonObject1,    ActivitySku.class);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("activity_id"), Bytes.toBytes(String.valueOf(baseCategory1.getActivity_id ())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("sku_id"), Bytes.toBytes(String.valueOf(baseCategory1.getSku_id())));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("create_time"), Bytes.toBytes(String.valueOf(baseCategory1.getCreate_time())));
            System.out.println("###################### +" + table1 +"@@@@@@@@@@@@@@@@@@@@@@@");
        }
        if (put.isEmpty()){
            System.out.println("no args");
        }else {
            table.put(put);
        }*/
    }

    @Override
    public void close() throws Exception {


        if (table != null){
            table.close();
        }

        if (conn != null){
            conn.close();
        }
    }
}
